from airflow.models.dag import DAG
from airflow.utils.dates import days_ago
import datetime,timedelta
import pendulum

local_tz = pendulum.timezone("Asia/Shanghai")
from dagen.operators import SSHOperator

pt= datetime.datetime.now().strftime('%Y%m%d')
#调度周期
schedule_interval="0 1 * * *"

default_args = {
    'owner': 'hgc',  # 拥有者名称
    'depends_on_past': False,   # 是否依赖上一个自己的执行状态
    'email': ['chenghe@tesla.com'],  # 接收通知的email列表
    'email_on_failure': True,  # 是否在任务执行失败时接收邮件
    'email_on_retry': True,  # 是否在任务重试时接收邮件
    'retries': 3 # 失败重试次数
}

with DAG(
    dag_id="pipe_dag",
    start_date=datetime.today() - timedelta(1),
    schedule_interval=None,
    tags=['pipe_dag'],
) as child_dag:
    # [START howto_operator_external_task_sensor]
    pipe_task = ExternalTaskSensor(
        task_id="pipe_task",
        external_dag_id="cdp_try_drive_label_generate_dwd2dws",
        external_task_id="parent_task",
        timeout=600,
        allowed_states=['success'],
        failed_states=['failed', 'skipped'],
        mode="reschedule"
    )
    # [END howto_operator_external_task_sensor]
    child_task2 = DummyOperator(task_id="child_task2")
    pipe_task >> child_task2




